Manual Processing of CSV Files in Python¶
By Cesar Perez
Introduction.¶
The goal of this exercise is to write a code capable of:
- Open csv/txt files.
- Summarize data by a set of given parameters.
- Output the summary as a dictionary.
The exercise is focused on string manipulation and data structures, thus I won't use common data manupulation libraries such as Pandas or CSV. Under situations where memory is limited but big datasets needs to be analysed, the proposed function should serve as a workaround.
For demostration, I'm using the open dataset supermarket-sales.csv available on Kaggle, you can download it from here
Defining Functions.¶
To achieve the goal defined earlier, I defined multiple functions, here is an overview:
- csv2summary. This is the main function, it opens the csv file, iterate on its rows and process the final summary by using the rest of functions bellow.
- transform_dict. Transforms a dictionary that stores the csv relevant values into a nested structure.
- dict_addition. In charge of the cummulative operations.
- filter_values. In charge of deciding if a rows should be ignored or not based on the user's given parameters.
- sum_values. Performs a sum of values.
- mean_values. Returns a tuple having sum, count and mean of values
- adjust_line. it detects if a delimiter character is in between quotation characters that can potentially break the csv integrity and removes them.
transform_dict.¶
If we want to summarize a csv file, we should be start by relating columns and values by using dictionaries, where the key value would represent the column/field name and the value would represent the content of a cell or a column. We can know the column sequence by reading the first row of the csv or by introducing it as a list.
Starting from this basic structure, we can summarize the data if we create a new dictionary where the category values we are interested in are converted into keys storing the final values. If we want to sumarize by more than one field, then we must nest dictionaries, in the way that the combination of values A and C is kept independent of other combinations, for example, B and C and so on. The function bellow handles this transformation.
In the example bellow, I'm interested in summarizing my data by two categorical fields: Branch and Custumer type. Branch has three possible values 'A', 'B' and 'C', while Custumer type can be 'Member' or 'Normal'. Gross Income and Rating are the numerical fields I want to summarize, the input dictionary bellow represents a single row.
def transform_dict(in_dict,append,index_list,max_level,append_len, level = 0):
if level < max_level:
new_key = list(append.values())[level]
in_dict[new_key] = {}
level += 1
transform_dict(in_dict[new_key], append, index_list, max_level,append_len,level)
elif level >= max_level and level < append_len:
new_key = list(append.keys())[level]
new_value = list(append.values())[level]
in_dict[new_key] = float(new_value)
level += 1
transform_dict(in_dict, append, index_list, max_level,append_len,level)
return in_dict
print(transform_dict({},{'Branch': 'A', 'Customer type': 'Member', 'gross income': '30.91', 'Rating': '6.6'},['Branch','Customer type'],2,4))
{'A': {'Member': {'gross income': 30.91, 'Rating': 6.6}}}
The Output of the function converts the gross value of 30.91 and rating of 6.6 as relative to the category A - Member.
sum_values - mean_values.¶
These two functions calculates sum and average respectively. they are called by a dictionary mapping available on the dict_addition function, depending on the user's parameters, one of these two will be dinamically selected.
def sum_values(existing, addition):
return float(existing+addition)
def mean_values(existing, addition):
if type(existing) == tuple:
sum_val = existing[0] + addition
count_val = existing[1] + 1
else:
sum_val = existing + addition
count_val = 2
mean_val = sum_val/count_val
return tuple([sum_val,count_val,mean_val])
dict_addition.¶
Going back to the output of the transform_dict function above, the next step to get our summarized data is to store the cumulation of values into a final dictionary that represents the whole table. The funtion below handles that task.
def dict_addition(in_dict,append,values_dict,append_len,level = 0):
select_operation = {'sum': sum_values, 'mean':mean_values}
level_key = list(append.keys())[0]
value_col = list(values_dict.keys())
if in_dict.get(level_key,False) != False and level_key not in value_col: #key exists, inspect next level
level += 1
dict_addition(in_dict[level_key],append[level_key],values_dict,append_len,level = level)
elif in_dict.get(level_key,False) != False and level_key in value_col and level < append_len: #acumulation of values
for indx, key in enumerate(in_dict):
in_dict[key] = select_operation[values_dict[key]](in_dict[key],append[list(in_dict.keys())[indx]])
else:
in_dict[level_key] = append[level_key] #key does not exist, appending whole dict
return
The example bellow shows 4 dictionaries having the same structure as the output of the transform_dict function, out_file represent our final summary. We expect the two entries having A - Member (items 1 and 3) to get summarized as gross income = 70, then A - Normal and B - Normal being single entries, should have values of 20 and 10 respectively. Since A - Member and A - Normal belong to branch A, these two should be part of the branch = 'A' dictionary, while branch = 'B' should have only the custumer type = 'Normal' value.
Note that the dictionary '{'gross income':'sum'}' is helping us to pick the sum_values function described earlier.
out_file = {}
for row in [{'A': {'Member': {'gross income': 30}}},
{'B': {'Normal': {'gross income': 10}}},
{'A': {'Member': {'gross income': 40}}},
{'A': {'Normal': {'gross income': 20}}}]:
dict_addition(out_file, row, {'gross income':'sum'}, 4)
print(out_file)
{'A': {'Member': {'gross income': 70.0}, 'Normal': {'gross income': 20}}, 'B': {'Normal': {'gross income': 10}}}
filter_values.¶
This function analyses each row, and decides if it should be excluded based on criteria provided by the user.
def filter_values(split_line,filter_dict,col_mapping):
for filter in filter_dict:
for val in filter_dict[filter]:
if split_line[col_mapping[filter]] == val:
return True
return False
In the example bellow, we are deciding to exclude branch = 'A', which is the case on the input row (second value on the first input param). We expect the function to return True, meaning that the row should be excluded.
print(filter_values(['849-09-3807', 'A', 'Yangon', 'Member', 'Female', 'Fashion accessories', '88.34', '7', '30.919', '649.299', '2/18/2019', '13:28', 'Cash', '618.38', '4.761904762', '30.919', '6.6']
,{'Branch':'A'}
,{'Invoice ID': 0, 'Branch': 1, 'City': 2, 'Customer type': 3, 'Gender': 4, 'Product line': 5, 'Unit price': 6, 'Quantity': 7, 'Tax 5%': 8, 'Total': 9, 'Date': 10, 'Time': 11, 'Payment': 12, 'cogs': 13, 'gross margin percentage': 14, 'gross income': 15, 'Rating': 16}))
True
adjust_line.¶
This auxiliary function indentify and fix rows where the delimiter character (for example, a comma ',') exists between a pair of quoting values, meaning that can potentially break the column structure of the csv. The function is necessary given that we are not importing any library that would handle that for us.
def adjust_line(line,quoting_char = '"',sep = ','):
max_indx = len(line)
fixed_line = ''
indx_list = []
for indx,character in enumerate(line):
if character == quoting_char:
indx_list.append(indx)
if len(indx_list) == 0:
return line
paired_list = []
for pair in range(0,len(indx_list),2):
paired_list.append(tuple([indx_list[pair], indx_list[pair+1]]))
line_sections = []
line_pos = 0
for indx,section in enumerate(paired_list):
line_sections.append(tuple([line_pos,section[0]-1]))
line_sections.append(section)
line_pos = section[1]+1
if indx == len(paired_list)-1:
line_sections.append(tuple([line_pos,max_indx]))
for section in line_sections:
if section in paired_list:
fixed_line = fixed_line+line[section[0]:section[1]+1].replace(sep,'')
else:
fixed_line = fixed_line+line[section[0]:section[1]+1]
return fixed_line
The code bellow shows two examples of csv rows represented as string, the first one would not represent and issue since none of the given values has a comma and this character is only use to delimit our multiple values, but the second row has this problem on the last column. The adjust_line removes the conflicting character.
print(adjust_line('849-09-3807, A, Yangon, Member, Female, Fashion accessories'))
print(adjust_line('849-09-3807, A, Yangon, Member, Female,"Food, kitchen and Cooking"'))
849-09-3807, A, Yangon, Member, Female, Fashion accessories 849-09-3807, A, Yangon, Member, Female,"Food kitchen and Cooking"
The cell bellow shows how the string is parsed differently depending on the use of adjust_line.
print(adjust_line('849-09-3807, A, Yangon, Member, Female,"Food, kitchen and Cooking"').split(','))
print('849-09-3807, A, Yangon, Member, Female,"Food, kitchen and Cooking"'.split(','))
['849-09-3807', ' A', ' Yangon', ' Member', ' Female', '"Food kitchen and Cooking"'] ['849-09-3807', ' A', ' Yangon', ' Member', ' Female', '"Food', ' kitchen and Cooking"']
csv2summary.¶
This functions wraps-up everything presented earlier. After opening the target file, defines the headers (they could or could not be the first row of the file), then checks if the line needs to be fixed by using adjust_line, then decides if the row should be excluded by using filter_values. If the line is relevant for the summary it gathers the relevant values by using transform_dict and summarize the results by using dict_addition, lastly returns the final output.
def csv2summary(input_file, sep = ',',quoting_char = '"', index_list = [], values_dict = {}, filter_dict = {}, known_headers = [], if_headers = True, if_adjust = False, encode = 'utf-8'):
out_file = {}
max_level = len(index_list)
append_len = max_level+len(values_dict)
with open(input_file, 'r', encoding=encode) as in_file:
if if_headers:
headers = tuple(in_file.readline().strip().split(sep))
else:
headers = tuple(known_headers)
col_mapping = {}
for col in headers:
col_mapping[col] = headers.index(col)
for line in in_file:
line_keys = {}
if if_adjust:
split_line = adjust_line(line.strip(), quoting_char, sep).split(sep)
else:
split_line = line.strip().split(sep)
if filter_values(split_line,filter_dict,col_mapping):
continue
for indx_level in index_list:
line_keys[indx_level] = split_line[col_mapping[indx_level]]
for value in values_dict:
line_keys[value] = split_line[col_mapping[value]]
adjusted_keys = transform_dict({},line_keys,index_list,max_level,append_len)
dict_addition(out_file, adjusted_keys, values_dict, append_len)
return out_file
The following examples show the output of the function using different levels of analysis.
csv2summary('supermarket_sales.csv', index_list = ['City'], values_dict = {'gross income':'sum'})
{'Yangon': {'gross income': 5057.160500000002}, 'Naypyitaw': {'gross income': 5265.176500000002}, 'Mandalay': {'gross income': 5057.032000000003}}
csv2summary('supermarket_sales.csv', index_list = ['City', 'Gender'], values_dict = {'gross income':'sum'})
{'Yangon': {'Female': {'gross income': 2536.6269999999995}, 'Male': {'gross income': 2520.5335}}, 'Naypyitaw': {'Female': {'gross income': 2937.403000000002}, 'Male': {'gross income': 2327.7735000000007}}, 'Mandalay': {'Female': {'gross income': 2520.395000000001}, 'Male': {'gross income': 2536.637}}}
Testing¶
This part is divided in two:
- Convert results to pandas. We'll declare auxiliary functions to convert our output to a format compatible to be read with pandas, the purpose of this is to make the comparison easier.
- Accuracy test. This test consists of comparing our summary results vs a grouped pandas dataframe and confirm they agree in terms of amounts.
- Performance vs Large files. This test will consist on comparing processing time and memory usage vs a pandas dataframe.
import pandas as pd
Convert results to pandas¶
The following functions transforms the csvsummary output to a multiindex dictionary format for pandas.
def inspect_item(csvsummary, output, target,subsummary, level = 0, key_index = []):
if level == 0:
key_index.append(subsummary)
for item in csvsummary:
if type(csvsummary[item]) == dict:
level += 1
if len(key_index) == level:
key_index.append(item)
else:
key_index[level] = item
inspect_item(csvsummary[item],output,target,item,level,key_index)
level -= 1
elif item == target:
value = csvsummary[item]
output[tuple(key_index)] = value
key_index.pop()
def csvsummary2pandas(csvsummary,target_list):
output = {}
for target in target_list:
output[target] = {}
for subsummary in csvsummary:
inspect_item(csvsummary[subsummary], output[target], target, subsummary,level = 0, key_index = [])
return output
csvsummary2pandas(csv2summary('supermarket_sales.csv', index_list = ['City', 'Gender'], values_dict = {'gross income':'sum'}), ['gross income'])
{'gross income': {('Yangon', 'Female'): 2536.6269999999995, ('Yangon', 'Male'): 2520.5335, ('Naypyitaw', 'Female'): 2937.403000000002, ('Naypyitaw', 'Male'): 2327.7735000000007, ('Mandalay', 'Female'): 2520.395000000001, ('Mandalay', 'Male'): 2536.637}}
Accuracy Test¶
The following 3 examples consist of:
- Use csv2summary, each example will use different configuration. The output will be converted to be read in pandas.
- use pandas.read_csv and then .groupby to get an equivalent result.
- confirm the results are the same by merging both dataframes and checking side by side.
We are able to confirm the expeted result, both methods lead to the same outcome.
test = pd.DataFrame.from_dict(csvsummary2pandas(csv2summary('supermarket_sales.csv', index_list = ['City', 'Gender'], values_dict = {'gross income':'sum'}), ['gross income'])).sort_index()
test
gross income | ||
---|---|---|
Mandalay | Female | 2520.3950 |
Male | 2536.6370 | |
Naypyitaw | Female | 2937.4030 |
Male | 2327.7735 | |
Yangon | Female | 2536.6270 |
Male | 2520.5335 |
pandas = pd.read_csv('supermarket_sales.csv').groupby(['City', 'Gender']).agg({'gross income':'sum'}).sort_index()
pandas
gross income | ||
---|---|---|
City | Gender | |
Mandalay | Female | 2520.3950 |
Male | 2536.6370 | |
Naypyitaw | Female | 2937.4030 |
Male | 2327.7735 | |
Yangon | Female | 2536.6270 |
Male | 2520.5335 |
pandas.merge(test, right_index=True, left_on=['City', 'Gender'])
gross income_x | gross income_y | ||
---|---|---|---|
City | Gender | ||
Mandalay | Female | 2520.3950 | 2520.3950 |
Male | 2536.6370 | 2536.6370 | |
Naypyitaw | Female | 2937.4030 | 2937.4030 |
Male | 2327.7735 | 2327.7735 | |
Yangon | Female | 2536.6270 | 2536.6270 |
Male | 2520.5335 | 2520.5335 |
test = pd.DataFrame.from_dict(csvsummary2pandas(csv2summary('supermarket_sales.csv', index_list = ['Branch','Customer type'], values_dict = {'gross income':'sum'}), ['gross income'])).sort_index()
pandas = pd.read_csv('supermarket_sales.csv').groupby(['Branch','Customer type']).agg({'gross income':'sum'})
pandas.merge(test, right_index=True, left_on=['Branch','Customer type'])
gross income_x | gross income_y | ||
---|---|---|---|
Branch | Customer type | ||
A | Member | 2554.1655 | 2554.1655 |
Normal | 2502.9950 | 2502.9950 | |
B | Member | 2557.3660 | 2557.3660 |
Normal | 2499.6660 | 2499.6660 | |
C | Member | 2708.6325 | 2708.6325 |
Normal | 2556.5440 | 2556.5440 |
test = pd.DataFrame.from_dict(csvsummary2pandas(csv2summary('supermarket_sales.csv', index_list = ['Branch', 'Gender','Product line'], values_dict = {'gross income':'sum'}), ['gross income'])).sort_index()
pandas = pd.read_csv('supermarket_sales.csv').groupby(['Branch', 'Gender','Product line']).agg({'gross income':'sum'})
pandas.merge(test, right_index=True, left_on=['Branch', 'Gender','Product line'])
gross income_x | gross income_y | |||
---|---|---|---|---|
Branch | Gender | Product line | ||
A | Female | Electronic accessories | 474.5855 | 474.5855 |
Fashion accessories | 468.3915 | 468.3915 | ||
Food and beverages | 333.3220 | 333.3220 | ||
Health and beauty | 272.1380 | 272.1380 | ||
Home and lifestyle | 601.7530 | 601.7530 | ||
Sports and travel | 386.4370 | 386.4370 | ||
Male | Electronic accessories | 397.6580 | 397.6580 | |
Fashion accessories | 309.3470 | 309.3470 | ||
Food and beverages | 483.9685 | 483.9685 | ||
Health and beauty | 327.7550 | 327.7550 | ||
Home and lifestyle | 465.7325 | 465.7325 | ||
Sports and travel | 536.0725 | 536.0725 | ||
B | Female | Electronic accessories | 388.8815 | 388.8815 |
Fashion accessories | 432.4520 | 432.4520 | ||
Food and beverages | 500.4760 | 500.4760 | ||
Health and beauty | 304.7785 | 304.7785 | ||
Home and lifestyle | 455.1015 | 455.1015 | ||
Sports and travel | 438.7055 | 438.7055 | ||
Male | Electronic accessories | 423.0920 | 423.0920 | |
Fashion accessories | 349.1345 | 349.1345 | ||
Food and beverages | 224.0425 | 224.0425 | ||
Health and beauty | 646.6815 | 646.6815 | ||
Home and lifestyle | 380.5730 | 380.5730 | ||
Sports and travel | 513.1135 | 513.1135 | ||
C | Female | Electronic accessories | 427.1055 | 427.1055 |
Fashion accessories | 548.5565 | 548.5565 | ||
Food and beverages | 745.7695 | 745.7695 | ||
Health and beauty | 306.9400 | 306.9400 | ||
Home and lifestyle | 373.4730 | 373.4730 | ||
Sports and travel | 535.5585 | 535.5585 | ||
Male | Electronic accessories | 476.1790 | 476.1790 | |
Fashion accessories | 478.1135 | 478.1135 | ||
Food and beverages | 385.9855 | 385.9855 | ||
Health and beauty | 484.2660 | 484.2660 | ||
Home and lifestyle | 288.2200 | 288.2200 | ||
Sports and travel | 215.0095 | 215.0095 |
Performace vs large files.¶
For this test I used a modified version of the csv file I've been using for demostration, the data has been duplicated multiple times to make 12000000 records, the file size is 1.46 GB. The original file only has 1000 records and its size is 128 KB.
The following cells shows that, at the expense of CPU time, csv2summary cause a minimal increment in memory usage compared to a direct use of pandas (low_memory=False) which is the expected result.
import setuptools
%load_ext memory_profiler
%%time
%memit csv2summary('supermarket_sales_mod.csv', index_list = ['City','Product line'], values_dict = {'gross income':'sum'})
peak memory: 110.80 MiB, increment: 0.26 MiB CPU times: total: 1min 2s Wall time: 1min 3s
%%time
%memit pd.read_csv('supermarket_sales_mod.csv',low_memory=False).groupby(['City','Product line']).agg({'gross income':'sum'})
peak memory: 6438.32 MiB, increment: 6327.55 MiB CPU times: total: 33.3 s Wall time: 36.3 s
The following cell modifies the memory usage bahavior in pandas by setting low_memory to True.
%%time
%memit pd.read_csv('supermarket_sales_mod.csv', low_memory=True).groupby(['City','Product line']).agg({'gross income':'sum'})
peak memory: 5549.17 MiB, increment: 5438.44 MiB CPU times: total: 23.5 s Wall time: 26.7 s
Conclusion.¶
Even with multiple limitations, the proposed approach is a viable solution to summarize data in cases where memory is limited at the expense of greater processing time.